Skip to content

19.1 Agent 架构深度解析

🎯 本节目标

深入理解 DeepAnalyze 自研 Agent 框架的核心实现,包括迭代推理循环、代码执行机制和消息处理流程。


🔄 核心:迭代推理系统

DeepAnalyzeVLLM 类结构

DeepAnalyze 的核心是 DeepAnalyzeVLLM 类,它实现了一个最多 30 轮的迭代推理循环

python
class DeepAnalyzeVLLM:
    """DeepAnalyze 的核心 Agent 实现"""

    def __init__(self, checkpoint_path, api_base="http://localhost:8000"):
        self.model_name = checkpoint_path
        self.api_base = api_base
        self.max_rounds = 30  # 最大迭代轮次

    def generate(self, prompt: str, workspace: str) -> dict:
        """
        主入口:执行数据分析任务

        Args:
            prompt: 用户的分析请求
            workspace: 工作目录(包含数据文件)

        Returns:
            包含推理过程和最终答案的字典
        """
        messages = [{"role": "user", "content": prompt}]

        for round_num in range(self.max_rounds):
            # 1. 调用 vLLM API 获取响应
            response = self._call_vllm_api(messages)

            # 2. 检测并执行代码块
            if self._has_code_block(response):
                code_blocks = self._extract_code_blocks(response)
                for code in code_blocks:
                    result = self._execute_code(code, workspace)
                    messages.append({
                        "role": "execute",
                        "content": f"<Execute>{result}</Execute>"
                    })

            # 3. 检查是否完成
            if self._has_answer(response):
                return {
                    "reasoning": messages,
                    "answer": self._extract_answer(response)
                }

            # 4. 将响应添加到消息历史
            messages.append({"role": "assistant", "content": response})

        return {"reasoning": messages, "answer": "Max rounds reached"}

与 LangGraph 的架构对比

DeepAnalyze 架构                    LangGraph 架构
─────────────────                   ──────────────
     ┌─────┐                            ┌─────┐
     │User │                            │START│
     └──┬──┘                            └──┬──┘
        │                                  │
        ▼                                  ▼
   ┌────────┐                        ┌──────────┐
   │  LLM   │◄──┐                    │  Agent   │◄──┐
   └────┬───┘   │                    └────┬─────┘   │
        │       │                         │         │
        ▼       │                         ▼         │
   ┌────────┐   │                    ┌──────────┐   │
   │Extract │   │                    │ Condition│   │
   │ Code   │   │                    │   Edge   │   │
   └────┬───┘   │                    └────┬─────┘   │
        │       │                    ┌────┴────┐    │
        ▼       │                    │         │    │
   ┌────────┐   │               ┌────▼───┐ ┌───▼──┐ │
   │Execute │───┘               │ Tools  │ │ END  │ │
   └────────┘                   └────┬───┘ └──────┘ │
        │                            │              │
   ┌────▼────┐                       └──────────────┘
   │ Answer? │
   └────┬────┘
    yes │ no
        │  └────────────────────┐
        ▼                       │
   ┌─────────┐                  │
   │  Return │                  │
   └─────────┘                  │
                                └───► 继续循环

🏷️ 标签系统详解

DeepAnalyze 使用自定义 XML 标签来结构化 LLM 的输出:

标签类型

标签用途触发动作
<Analyze>分析过程记录
<Understand>数据理解阶段
<Code>Python 代码块触发代码执行
<Execute>执行结果由系统生成
<Answer>最终答案触发终止循环

标签提取实现

python
import re

def extract_code_blocks(response: str) -> list[str]:
    """
    从响应中提取所有 <Code> 标签内的代码

    支持两种格式:
    1. <Code>python code</Code>
    2. <Code>```python
       code
       ```</Code>
    """
    pattern = r"<Code>(.*?)</Code>"
    matches = re.findall(pattern, response, re.DOTALL)

    code_blocks = []
    for match in matches:
        # 处理 markdown 代码块格式
        if match.strip().startswith("```"):
            # 移除 ```python 和 ``` 标记
            code = re.sub(r"```\w*\n?", "", match)
            code = code.strip()
        else:
            code = match.strip()
        code_blocks.append(code)

    return code_blocks


def extract_answer(response: str) -> str:
    """提取 <Answer> 标签内的内容"""
    pattern = r"<Answer>(.*?)</Answer>"
    match = re.search(pattern, response, re.DOTALL)
    return match.group(1).strip() if match else ""


def has_answer(response: str) -> bool:
    """检查是否包含 <Answer> 标签"""
    return "<Answer>" in response

标签设计的优缺点

优点:

  • ✅ 简单直观,易于解析
  • ✅ 模型训练时可以强化标签使用
  • ✅ 输出结构化,便于后处理
  • ✅ 无需依赖外部工具调用 API

缺点:

  • ❌ 无法动态注册新工具
  • ❌ 所有"工具"只有代码执行一种
  • ❌ 不如 function calling 灵活
  • ❌ 需要模型训练来学习标签使用

🖥️ 代码执行沙箱

执行机制

DeepAnalyze 使用 subprocess 在隔离环境中执行代码:

python
import subprocess
import tempfile
import os

def execute_code(code: str, workspace: str, timeout: int = 120) -> str:
    """
    在沙箱环境中执行 Python 代码

    Args:
        code: 要执行的 Python 代码
        workspace: 工作目录
        timeout: 超时时间(秒)

    Returns:
        执行结果(stdout + stderr)
    """
    # 创建临时文件
    with tempfile.NamedTemporaryFile(
        mode='w',
        suffix='.py',
        dir=workspace,
        delete=False
    ) as f:
        f.write(code)
        temp_file = f.name

    try:
        # 设置环境变量(无头模式绘图)
        env = os.environ.copy()
        env['MPLBACKEND'] = 'Agg'

        # 执行代码
        result = subprocess.run(
            ['python', temp_file],
            cwd=workspace,
            capture_output=True,
            text=True,
            timeout=timeout,
            env=env
        )

        # 合并 stdout 和 stderr
        output = result.stdout
        if result.stderr:
            output += f"\n[STDERR]\n{result.stderr}"

        return output

    except subprocess.TimeoutExpired:
        return f"[ERROR] Execution timeout after {timeout} seconds"

    except Exception as e:
        return f"[ERROR] {type(e).__name__}: {str(e)}"

    finally:
        # 清理临时文件
        if os.path.exists(temp_file):
            os.remove(temp_file)

异步执行版本

python
import asyncio

async def execute_code_async(
    code: str,
    workspace: str,
    timeout: int = 120
) -> str:
    """异步代码执行,用于 API 服务"""
    with tempfile.NamedTemporaryFile(
        mode='w',
        suffix='.py',
        dir=workspace,
        delete=False
    ) as f:
        f.write(code)
        temp_file = f.name

    try:
        process = await asyncio.create_subprocess_exec(
            'python', temp_file,
            cwd=workspace,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            env={**os.environ, 'MPLBACKEND': 'Agg'}
        )

        try:
            stdout, stderr = await asyncio.wait_for(
                process.communicate(),
                timeout=timeout
            )
            output = stdout.decode()
            if stderr:
                output += f"\n[STDERR]\n{stderr.decode()}"
            return output

        except asyncio.TimeoutError:
            process.terminate()
            return f"[ERROR] Execution timeout"

    finally:
        if os.path.exists(temp_file):
            os.remove(temp_file)

安全考虑

代码执行安全措施:
├── 进程隔离(subprocess)
├── 超时限制(默认 120 秒)
├── 工作目录隔离
├── 环境变量控制
└── ⚠️ 注意:无完整沙箱(如 Docker)

生产环境建议:
├── 使用 Docker 容器隔离
├── 限制网络访问
├── 限制文件系统访问
├── 设置资源限制(CPU/内存)
└── 实施代码审查

📨 消息处理流程

消息格式规范化

python
def normalize_message_content(message: dict) -> str:
    """
    将 OpenAI 格式的消息内容规范化为纯文本

    OpenAI 格式支持:
    - 字符串内容: "Hello"
    - 列表内容: [{"type": "text", "text": "Hello"}]
    """
    content = message.get("content", "")

    if isinstance(content, str):
        return content

    if isinstance(content, list):
        texts = []
        for item in content:
            if isinstance(item, dict) and item.get("type") == "text":
                texts.append(item.get("text", ""))
        return "\n".join(texts)

    return str(content)

构建 vLLM 消息

python
def prepare_vllm_messages(
    user_query: str,
    workspace_files: list[str]
) -> list[dict]:
    """
    构建发送给 vLLM 的消息列表

    消息结构:
    # Instruction
    <用户查询>

    # Data
    <工作目录中的文件列表>
    """
    instruction = f"# Instruction\n{user_query}"

    if workspace_files:
        data_section = "# Data\n" + "\n".join(
            f"- {f}" for f in workspace_files
        )
        content = f"{instruction}\n\n{data_section}"
    else:
        content = instruction

    return [{"role": "user", "content": content}]

📊 报告生成机制

从对话到报告

python
def generate_report(messages: list[dict]) -> str:
    """
    从对话历史生成 Markdown 报告

    报告结构:
    1. 主体:最终 <Answer> 的内容
    2. 附录:完整的对话历史(所有标签内容)
    """
    report_parts = []
    conversation_log = []

    for msg in messages:
        content = msg.get("content", "")
        role = msg.get("role", "")

        # 提取各类标签
        for tag in ["Analyze", "Understand", "Code", "Execute", "Answer"]:
            pattern = f"<{tag}>(.*?)</{tag}>"
            matches = re.findall(pattern, content, re.DOTALL)
            for match in matches:
                conversation_log.append(f"### {tag}\n{match.strip()}")

        # 提取最终答案作为报告主体
        if "<Answer>" in content:
            answer = extract_answer(content)
            report_parts.insert(0, answer)

    # 组装报告
    report = "\n\n".join(report_parts)
    report += "\n\n---\n\n## 附录:完整对话历史\n\n"
    report += "\n\n".join(conversation_log)

    return report

🔌 vLLM API 集成

API 调用封装

python
import requests

class VLLMClient:
    """vLLM API 客户端"""

    def __init__(self, base_url: str = "http://localhost:8000"):
        self.base_url = base_url.rstrip("/")

    def chat_completion(
        self,
        messages: list[dict],
        model: str = "deepanalyze-8b",
        temperature: float = 0.7,
        max_tokens: int = 4096
    ) -> str:
        """
        调用 vLLM 的 chat/completions 端点
        """
        response = requests.post(
            f"{self.base_url}/v1/chat/completions",
            json={
                "model": model,
                "messages": messages,
                "temperature": temperature,
                "max_tokens": max_tokens
            }
        )
        response.raise_for_status()

        data = response.json()
        return data["choices"][0]["message"]["content"]

流式响应支持

python
def stream_chat_completion(
    self,
    messages: list[dict],
    model: str = "deepanalyze-8b"
) -> Iterator[str]:
    """流式生成响应"""
    response = requests.post(
        f"{self.base_url}/v1/chat/completions",
        json={
            "model": model,
            "messages": messages,
            "stream": True
        },
        stream=True
    )

    for line in response.iter_lines():
        if line:
            line = line.decode("utf-8")
            if line.startswith("data: "):
                data = json.loads(line[6:])
                if data != "[DONE]":
                    delta = data["choices"][0].get("delta", {})
                    if "content" in delta:
                        yield delta["content"]

🎨 完整工作流示例

python
# 1. 初始化客户端
deepanalyze = DeepAnalyzeVLLM(
    checkpoint_path="/models/deepanalyze-8b/",
    api_base="http://localhost:8000"
)

# 2. 准备工作目录
workspace = "/data/analysis/"
# 目录中包含: person.csv, enrolled.csv, etc.

# 3. 发送分析请求
prompt = """
请分析工作目录中的学生贷款数据,生成一份包含以下内容的报告:
1. 数据概览和统计摘要
2. 贷款违约率分析
3. 关键风险因素识别
4. 可视化图表
5. 结论和建议
"""

# 4. 执行分析
result = deepanalyze.generate(prompt, workspace)

# 5. 获取结果
print("推理过程:", result["reasoning"])
print("最终答案:", result["answer"])

💡 核心洞察

DeepAnalyze Agent 的设计哲学

  1. 简单即美德

    • 使用简单的 for 循环而非复杂的图结构
    • 标签系统比 function calling 更直观
  2. 专注于领域

    • 只有"代码执行"一种工具
    • 但这个工具足够强大,可以做任何数据分析
  3. 训练优于工程

    • 通过训练让模型学会使用标签
    • 而非通过复杂的提示工程
  4. 端到端控制

    • 从训练到推理全链路可控
    • 无第三方依赖的黑盒

与 LangGraph 的本质区别

LangGraph:
├── "声明式" - 定义图结构,框架执行
├── "通用" - 支持任意工具和 LLM
├── "工程" - 通过 Prompt 和工具集成实现能力
└── "灵活" - 运行时可修改行为

DeepAnalyze:
├── "命令式" - 直接编写执行逻辑
├── "专用" - 只支持自己的模型
├── "训练" - 通过训练实现能力
└── "固定" - 行为由模型权重决定

接下来: 19.2 训练范式与数据

基于 MIT 许可证发布。内容版权归作者所有。